package taskprocess

import (
	"sync"
	"time"

	"acs/comet/config"

	"git.oschina.net/chaos.su/go-nsq"
	dataLogger "github.com/sunreaver/logger"
	"go.uber.org/zap"
)

var eventLogger *zap.SugaredLogger
var mongoLogger *zap.SugaredLogger

var monitor *TaskMonitor
var monitorCreateLock = new(sync.Mutex)

// TaskMonitor monitor task on the task queue servers.
type TaskMonitor struct {
	sync.RWMutex
	msgConsumer *nsq.Consumer
	handler     *AppPatchNotifier
}

// NewTaskMonitor create a new task monitor.
// Once the montior has been created, it will begin watching the task queue and processing the tasks.
func NewTaskMonitor(newCfg Config) (*TaskMonitor, error) {
	monitorCreateLock.Lock()
	defer monitorCreateLock.Unlock()
	if monitor != nil {
		return monitor, nil
	}

	eventLogger = dataLogger.GetSugarLogger("HandleEvent.log")
	mongoLogger = dataLogger.GetSugarLogger("mongo.log")
	if newCfg.ChannelName == "" {
		newCfg.ChannelName = DefaultTaskChannelName
	}

	if newCfg.TopicName == "" {
		newCfg.TopicName = DefaultTaskTopicName
	}
	cfg = &newCfg
	qConfig := nsq.NewConfig()
	qConfig.DialTimeout = time.Millisecond * DefaultTaskServerConnTimeout
	qClient, err := nsq.NewConsumer(cfg.TopicName, cfg.ChannelName, qConfig)
	if err != nil {
		return nil, err
	}
	err = qClient.ConnectToNSQLookupds(cfg.QueueLookupdAddrs)
	if err != nil {
		return nil, err
	}
	handler := &AppPatchNotifier{
		stopChan:   make(chan bool, config.DefaultConcurrencyTaskProcessors),
		cocurrency: uint32(config.DefaultConcurrencyTaskProcessors),
	}
	qClient.AddConcurrentHandlers(handler, config.DefaultConcurrencyTaskProcessors)

	m := &TaskMonitor{
		msgConsumer: qClient,
		handler:     handler,
	}
	initPersistStorage()
	go taskLoopCheck()
	return m, nil
}

// SetProcessingCurrency set the cocurrent task processor numbers.
func (tm *TaskMonitor) SetProcessingCurrency(n uint32) {
	tm.Lock()
	defer tm.Unlock()
	dn := n - tm.handler.cocurrency
	if dn > 0 {
		tm.msgConsumer.AddConcurrentHandlers(tm.handler, int(dn))
	} else if dn < 0 {
		tm.handler.StopN(-dn)
	}
	if dn != 0 {
		tm.msgConsumer.ChangeMaxInFlight(int(n))
	}
}
